package drds.data_propagate.task.manager;

import drds.data_propagate.common.DataPropagateException;
import drds.data_propagate.common.utils.JsonUtils;
import drds.data_propagate.common.zookeeper.ZkClientx;
import drds.data_propagate.entry.position.EntryPosition;
import drds.data_propagate.filter.aviater.AviaterRegexFilter;
import drds.data_propagate.metadata.MemoryMetaDataManager;
import drds.data_propagate.parse.*;
import drds.data_propagate.parse.binlog_event_position_manager.*;
import drds.data_propagate.parse.ha.HaController;
import drds.data_propagate.parse.ha.HeartBeatHaController;
import drds.data_propagate.parse.table_meta_data.TableMetaDataClassPathXmlApplicationContextManager;
import drds.data_propagate.parse.table_meta_data.TableMetaDataFactoryImpl;
import drds.data_propagate.parse.table_meta_data.TableMetaDataStore;
import drds.data_propagate.sink.entry.EntryEventListSink;
import drds.data_propagate.sink.entry.group.GroupEventListSink;
import drds.data_propagate.store.AbstractStoreScavenge;
import drds.data_propagate.store.BatchMode;
import drds.data_propagate.store.MemoryEventStore;
import drds.data_propagate.task.manager.model.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang.StringUtils;
import org.springframework.util.CollectionUtils;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
 * 单个实例，比如一个taskId会独立一个实例
 */
@Slf4j
public class TaskImpl extends drds.data_propagate.task.TaskImpl {
    public static final String FILTER = ".*\\\\..*";
    protected String filter = FILTER; // 过滤表达式
    protected Parameter parameters; // 对应参数

    public TaskImpl(TaskConfiguration taskConfiguration) {
        this.parameters = taskConfiguration.getParameter();
        this.taskIdSequense = taskConfiguration.getId();
        this.taskId = taskConfiguration.getTaskId();
        this.filter = taskConfiguration.getFilter();

        log.info("init TaskImpl for {}-{} with parameters:{}", taskIdSequense, taskId, parameters);

        // 初始化metaManager
        initMetaDataManager();
        // 初始化eventStore
        initEventStore();
        // 初始化eventSink
        initEventSink();
        // 初始化eventParser;
        initEventParser();

        // 基础工具，需要提前start，会有先订阅再根据filter条件启动parse的需求

        if (!metaDataManager.isStart()) {
            metaDataManager.start();
        }
        log.info("init successful....");
    }

    public void start() {
        // 初始化metaManager
        log.info("start CannalInstance for {}-{} with parameters:{}", taskIdSequense, taskId, parameters);
        super.start();
    }

    protected void initMetaDataManager() {
        log.info("init metaDataManager begin...");
        metaDataManager = new MemoryMetaDataManager();

        log.info("init metaDataManager end! \n\t load MetaDataManager:{} ", metaDataManager.getClass().getName());
    }

    protected void initEventStore() {
        log.info("init eventStore begin...");
        MemoryEventStore memoryEventStore = new MemoryEventStore(BatchMode.item_size);//默认是item_size
        memoryEventStore.setBufferSize(parameters.getMemoryStorageBufferSize());
        memoryEventStore.setBufferMemUnit(parameters.getMemoryStorageBufferMemUnit());
        memoryEventStore.setBatchMode(BatchMode.valueOf(parameters.getStorageScavengeMode().name()));
        memoryEventStore.setDdlIsolation(parameters.getDdlIsolation());
        memoryEventStore.setRaw(parameters.getMemoryStorageRawEntry());
        eventStore = memoryEventStore;

        if (eventStore instanceof AbstractStoreScavenge) {
            StorageScavengeMode scavengeMode = parameters.getStorageScavengeMode();
            AbstractStoreScavenge eventScavengeStore = (AbstractStoreScavenge) eventStore;
            eventScavengeStore.setTaskId(taskId);
            eventScavengeStore.setMetaDataManager(metaDataManager);
            eventScavengeStore.setOnAck(scavengeMode.isOnAck());
            eventScavengeStore.setOnFull(scavengeMode.isOnFull());
            eventScavengeStore.setOnSchedule(scavengeMode.isOnSchedule());
            if (scavengeMode.isOnSchedule()) {
                eventScavengeStore.setScavengeSchedule(parameters.getScavengeSchdule());
            }
        }
        log.info("init eventStore end! \n\t load EventStore:{}", eventStore.getClass().getName());
    }

    protected void initEventSink() {
        log.info("init eventListSink begin...");

        int groupSize = getGroupSize();
        if (groupSize <= 1) {
            eventListSink = new EntryEventListSink();
        } else {
            eventListSink = new GroupEventListSink(groupSize);
        }

        if (eventListSink instanceof EntryEventListSink) {
            ((EntryEventListSink) eventListSink).setNotUseTransactionTimelineBarrier(false);
            ((EntryEventListSink) eventListSink).setEventStore(getEventStore());
        }
        log.info("init eventListSink end! \n\t load EventListSink:{}", eventListSink.getClass().getName());
    }

    protected void initEventParser() {
        log.info("init eventParser begin...");
        SourcingType type = parameters.getSourcingType();

        List<List<DataSourcing>> groupDbAddresses = parameters.getGroupDbAddresses();
        if (!CollectionUtils.isEmpty(groupDbAddresses)) {
            int size = groupDbAddresses.get(0).size();// 取第一个分组的数量，主备分组的数量必须一致
            List<EventParser> eventParsers = new ArrayList<EventParser>();
            for (int i = 0; i < size; i++) {
                List<InetSocketAddress> dbAddress = new ArrayList<InetSocketAddress>();
                SourcingType lastType = null;
                for (List<DataSourcing> groupDbAddress : groupDbAddresses) {
                    if (lastType != null && !lastType.equals(groupDbAddress.get(i).getType())) {
                        throw new DataPropagateException(
                                String.format("master/slave Sourcing eventType is unmatch. %s vs %s", lastType,
                                        groupDbAddress.get(i).getType()));
                    }

                    lastType = groupDbAddress.get(i).getType();
                    dbAddress.add(groupDbAddress.get(i).getDbAddress());
                }

                // 初始化其中的一个分组parser
                eventParsers.add(doInitEventParser(lastType, dbAddress));
            }

            if (eventParsers.size() > 1) { // 如果存在分组，构造分组的parser
                GroupEventParser groupEventParser = new GroupEventParser();
                groupEventParser.setEventParserList(eventParsers);
                this.eventParser = groupEventParser;
            } else {
                this.eventParser = eventParsers.get(0);
            }
        } else {
            // 创建一个空数据库地址的parser，可能使用了tddl指定地址，启动的时候才会从tddl获取地址
            this.eventParser = doInitEventParser(type, new ArrayList<InetSocketAddress>());
        }

        log.info("init eventParser end! \n\t load EventParser:{}", eventParser.getClass().getName());
    }

    private EventParser doInitEventParser(SourcingType sourcingType, List<InetSocketAddress> inetSocketAddressList) {
        EventParser eventParser;
        if (sourcingType.isMysql()) {
            MysqlEventParser mysqlEventParser = new MysqlEventParser();
            mysqlEventParser.setTaskId(taskId);
            // 编码参数
            mysqlEventParser.setConnectionCharset(Charset.forName(parameters.getConnectionCharset()));
            mysqlEventParser.setConnectionCharsetNumber(parameters.getConnectionCharsetNumber());
            // 网络相关参数
            mysqlEventParser.setDefaultConnectionTimeoutInSeconds(parameters.getDefaultConnectionTimeoutInSeconds());
            mysqlEventParser.setSendBufferSize(parameters.getSendBufferSize());
            mysqlEventParser.setReceiveBufferSize(parameters.getReceiveBufferSize());
            // 心跳检查参数
            mysqlEventParser.setHeartBeatEnable(parameters.getDetectingEnable());
            mysqlEventParser.setHeartBeatSql(parameters.getDetectingSQL());
            mysqlEventParser.setDetectingIntervalInSeconds(parameters.getDetectingIntervalInSeconds());
            // 数据库信息参数
            mysqlEventParser.setSlaveId(parameters.getSlaveId());
            if (!CollectionUtils.isEmpty(inetSocketAddressList)) {
                mysqlEventParser.setMasterAuthenticationInfo(new AuthenticationInfo(inetSocketAddressList.get(0), parameters.getDbUsername(),
                        parameters.getDbPassword(), parameters.getDefaultDatabaseName()));

                if (inetSocketAddressList.size() > 1) {
                    mysqlEventParser
                            .setStandbyAuthenticationInfo(new AuthenticationInfo(inetSocketAddressList.get(1), parameters.getDbUsername(),
                                    parameters.getDbPassword(), parameters.getDefaultDatabaseName()));
                }
            }

            if (!CollectionUtils.isEmpty(parameters.getPositions())) {
                EntryPosition masterPosition = JsonUtils.unmarshalFromString(parameters.getPositions().get(0),
                        EntryPosition.class);
                // binlog位置参数
                mysqlEventParser.setMasterEntryPosition(masterPosition);

                if (parameters.getPositions().size() > 1) {
                    EntryPosition standbyPosition = JsonUtils.unmarshalFromString(parameters.getPositions().get(1),
                            EntryPosition.class);
                    mysqlEventParser.setStandbyEntryPosition(standbyPosition);
                }
            }
            mysqlEventParser.setFallbackIntervalInSeconds(parameters.getFallbackIntervalInSeconds());
            //mysqlEventParser.getProfilingEnabled().set(false);
            mysqlEventParser.setFilterTableError(parameters.getFilterTableError());
            mysqlEventParser.setParallel(parameters.getParallel());
            mysqlEventParser.setGtidMode(BooleanUtils.toBoolean(parameters.getGtidEnable()));
            // table_meta_data
            if (parameters.getTsdbSnapshotInterval() != null) {
                mysqlEventParser.setTsdbSnapshotInterval(parameters.getTsdbSnapshotInterval());
            }
            if (parameters.getTsdbSnapshotExpire() != null) {
                mysqlEventParser.setTsdbSnapshotExpire(parameters.getTsdbSnapshotExpire());
            }
            boolean tsdbEnable = BooleanUtils.toBoolean(parameters.getTsdbEnable());
            if (tsdbEnable) {
                mysqlEventParser.setTableMetaDataFactory(new TableMetaDataFactoryImpl() {

                    @Override
                    public void destory(String taskId) {
                        TableMetaDataClassPathXmlApplicationContextManager.destory(taskId);
                    }

                    @Override
                    public TableMetaDataStore build(String taskId, String springXml) {
                        try {
                            System.setProperty("canal.task.tsdb.url", parameters.getTsdbJdbcUrl());
                            System.setProperty("canal.task.tsdb.dbUsername", parameters.getTsdbJdbcUserName());
                            System.setProperty("canal.task.tsdb.dbPassword", parameters.getTsdbJdbcPassword());

                            return TableMetaDataClassPathXmlApplicationContextManager.build(taskId,
                                    "classpath:spring/table_meta_data/mysql-table_meta_data.xml");
                        } finally {
                            System.setProperty("canal.task.tsdb.url", "");
                            System.setProperty("canal.task.tsdb.dbUsername", "");
                            System.setProperty("canal.task.tsdb.dbPassword", "");
                        }
                    }
                });
                mysqlEventParser.setEnableTableMetaDataCache(tsdbEnable);
            }
            eventParser = mysqlEventParser;
        } else {
            throw new DataPropagateException("unsupport SourcingType for " + sourcingType);
        }

        // add transaction authentication_info at 2012-12-06
        if (eventParser instanceof AbstractEventParser) {
            AbstractEventParser abstractEventParser = (AbstractEventParser) eventParser;
            abstractEventParser.setEventListqQueueSize(parameters.getTransactionSize());
            abstractEventParser.setBinLogEventPositionManager(initLogPositionManager());

            abstractEventParser.setEventListSink(this.getEventListSink());

            if (StringUtils.isNotEmpty(filter)) {
                AviaterRegexFilter aviaterRegexFilter = new AviaterRegexFilter(filter);
                abstractEventParser.setEventFilter(aviaterRegexFilter);
            }

            // 设置黑名单
            if (StringUtils.isNotEmpty(parameters.getBlackFilter())) {
                AviaterRegexFilter aviaterRegexFilter = new AviaterRegexFilter(parameters.getBlackFilter());
                abstractEventParser.setEventBlackFilter(aviaterRegexFilter);
            }
        }
        if (eventParser instanceof MysqlEventParser) {
            MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser;

            // 初始化haController，绑定与eventParser的关系，haController会控制eventParser
            HaController haController = initHaController();
            mysqlEventParser.setHaController(haController);
        }
        return eventParser;
    }

    protected HaController initHaController() {
        log.info("init haController begin...");

        HaController haController = new HeartBeatHaController();
        ((HeartBeatHaController) haController).setDetectingRetryTimes(parameters.getDetectingRetryTimes());
        ((HeartBeatHaController) haController).setSwitchEnable(parameters.getHeartbeatHaEnable());
        log.info("init haController end! \n\t load HaController:{}", haController.getClass().getName());

        return haController;
    }

    protected BinLogEventPositionManager initLogPositionManager() {
        log.info("init logPositionPersistManager begin...");
        IndexMode indexMode = parameters.getIndexMode();
        BinLogEventPositionManager binLogEventPositionManager;
        if (indexMode.isMemory()) {
            binLogEventPositionManager = new MemoryBinLogEventPositionManager();
        } else if (indexMode.isZookeeper()) {
            binLogEventPositionManager = new ZooKeeperBinLogEventPositionManager(getZkclientx());
        } else if (indexMode.isMixed()) {
            MemoryBinLogEventPositionManager memoryLogPositionManager = new MemoryBinLogEventPositionManager();
            ZooKeeperBinLogEventPositionManager zooKeeperLogPositionManager = new ZooKeeperBinLogEventPositionManager(getZkclientx());
            binLogEventPositionManager = new PeriodMixedBinLogEventPositionManager(memoryLogPositionManager,
                    zooKeeperLogPositionManager, 1000L);
        } else if (indexMode.isMeta()) {
            binLogEventPositionManager = new MetaBinLogEventPositionManager(metaDataManager);
        } else if (indexMode.isMemoryMetaFailback()) {
            MemoryBinLogEventPositionManager primary = new MemoryBinLogEventPositionManager();
            MetaBinLogEventPositionManager secondary = new MetaBinLogEventPositionManager(metaDataManager);

            binLogEventPositionManager = new FailbackBinLogEventPositionManager(primary, secondary);
        } else {
            throw new DataPropagateException("unsupport indexMode for " + indexMode);
        }

        log.info("init binLogEventPositionManager end! \n\t load BinLogEventPositionManager:{}",
                binLogEventPositionManager.getClass().getName());

        return binLogEventPositionManager;
    }

    protected void startEventParserInternal(EventParser eventParser, boolean isGroup) {
        if (eventParser instanceof AbstractEventParser) {
            @SuppressWarnings("unused")
            AbstractEventParser abstractEventParser = (AbstractEventParser) eventParser;

        }

        super.startEventParserInternal(eventParser, isGroup);
    }

    private int getGroupSize() {
        List<List<DataSourcing>> groupDbAddresses = parameters.getGroupDbAddresses();
        if (!CollectionUtils.isEmpty(groupDbAddresses)) {
            return groupDbAddresses.get(0).size();
        } else {
            // 可能是基于tddl的启动
            return 1;
        }
    }

    private synchronized ZkClientx getZkclientx() {
        // 做一下排序，保证相同的机器只使用同一个链接
        List<String> zkClusters = new ArrayList<String>(parameters.getZkClusters());
        Collections.sort(zkClusters);

        return ZkClientx.getZkClient(StringUtils.join(zkClusters, ";"));
    }

}
